package com.ld.shieldsb.canalclient.util;

import java.net.InetSocketAddress;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.time.LocalDate;
import java.time.LocalDateTime;
import java.time.LocalTime;
import java.time.Month;
import java.time.ZoneId;
import java.time.format.DateTimeFormatter;
import java.util.Date;
import java.util.List;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.function.Function;

import javax.sql.DataSource;

import org.apache.commons.lang.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.client.impl.SimpleCanalConnector;
import com.alibaba.otter.canal.protocol.exception.CanalClientException;
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;
import com.ld.shieldsb.canalclient.client.AbstractCanalClient;
import com.ld.shieldsb.common.core.model.Result;
import com.ld.shieldsb.common.core.util.ResultUtil;

import lombok.extern.slf4j.Slf4j;

@Slf4j
public class CanalUtil {

    private static final Logger logger = LoggerFactory.getLogger(CanalUtil.class);

    /**
     * 通过DS执行sql
     */
    public static Object sqlRS(DataSource ds, String sql, Function<ResultSet, Object> fun) {
        try (Connection conn = ds.getConnection();
                Statement stmt = conn.createStatement(ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY)) {
            stmt.setFetchSize(Integer.MIN_VALUE);
            try (ResultSet rs = stmt.executeQuery(sql)) {
                return fun.apply(rs);
            }
        } catch (Exception e) {
            logger.error("sqlRs has error, sql: {} ", sql);
            throw new RuntimeException(e);
        }
    }

    public static Object sqlRS(DataSource ds, String sql, List<Object> values, Function<ResultSet, Object> fun) {
        try (Connection conn = ds.getConnection()) {
            try (PreparedStatement pstmt = conn.prepareStatement(sql, ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY)) {
                pstmt.setFetchSize(Integer.MIN_VALUE);
                if (values != null) {
                    for (int i = 0; i < values.size(); i++) {
                        pstmt.setObject(i + 1, values.get(i));
                    }
                }
                try (ResultSet rs = pstmt.executeQuery()) {
                    return fun.apply(rs);
                }
            }
        } catch (Exception e) {
            logger.error("sqlRs has error, sql: {} ", sql);
            throw new RuntimeException(e);
        }
    }

    /**
     * sql执行获取resultSet
     *
     * @param conn
     *            sql connection
     * @param sql
     *            sql
     * @param consumer
     *            回调方法
     */
    public static void sqlRS(Connection conn, String sql, Consumer<ResultSet> consumer) {
        try (Statement stmt = conn.createStatement(); ResultSet rs = stmt.executeQuery(sql)) {
            consumer.accept(rs);
        } catch (SQLException e) {
            logger.error(e.getMessage(), e);
        }
    }

    /**
     * 清楚列名中特殊字符
     * 
     * @Title cleanColumn
     * @author 吕凯
     * @date 2021年12月6日 上午9:40:44
     * @param column
     * @return String
     */
    public static String cleanColumn(String column) {
        if (column == null) {
            return null;
        }
        if (column.contains("`")) {
            column = column.replaceAll("`", "");
        }

        if (column.contains("'")) {
            column = column.replaceAll("'", "");
        }

        if (column.contains("\"")) {
            column = column.replaceAll("\"", "");
        }

        return column;
    }

    public static ThreadPoolExecutor newFixedThreadPool(int nThreads, long keepAliveTime) {
        return new ThreadPoolExecutor(nThreads, nThreads, keepAliveTime, TimeUnit.MILLISECONDS, new SynchronousQueue<>(), (r, exe) -> {
            if (!exe.isShutdown()) {
                try {
                    exe.getQueue().put(r);
                } catch (InterruptedException e) {
                    // ignore
                }
            }
        });
    }

    public static ThreadPoolExecutor newSingleThreadExecutor(long keepAliveTime) {
        return new ThreadPoolExecutor(1, 1, keepAliveTime, TimeUnit.MILLISECONDS, new SynchronousQueue<>(), (r, exe) -> {
            if (!exe.isShutdown()) {
                try {
                    exe.getQueue().put(r);
                } catch (InterruptedException e) {
                    // ignore
                }
            }
        });
    }

    private static LoadingCache<String, DateTimeFormatter> dateFormatterCache = CacheBuilder.newBuilder()
            .build(new CacheLoader<String, DateTimeFormatter>() {

                @Override
                public DateTimeFormatter load(String key) {
                    return DateTimeFormatter.ofPattern(key);
                }
            });

    public static Date parseDate2(String datetimeStr) {
        if (StringUtils.isEmpty(datetimeStr)) {
            return null;
        }
        try {
            datetimeStr = datetimeStr.trim();
            int len = datetimeStr.length();
            if (datetimeStr.contains("-") && datetimeStr.contains(":") && datetimeStr.contains(".")) {
                // 包含日期+时间+毫秒
                // 取毫秒位数
                int msLen = len - datetimeStr.indexOf(".") - 1;
                StringBuilder ms = new StringBuilder();
                for (int i = 0; i < msLen; i++) {
                    ms.append("S");
                }
                String formatter = "yyyy-MM-dd HH:mm:ss." + ms;

                DateTimeFormatter dateTimeFormatter = dateFormatterCache.get(formatter);
                LocalDateTime dateTime = LocalDateTime.parse(datetimeStr, dateTimeFormatter);
                return Date.from(dateTime.atZone(ZoneId.systemDefault()).toInstant());
            } else if (datetimeStr.contains("-") && datetimeStr.contains(":")) {
                // 包含日期+时间
                // 判断包含时间位数
                int i = datetimeStr.indexOf(":");
                i = datetimeStr.indexOf(":", i + 1);
                String formatter;
                if (i > -1) {
                    formatter = "yyyy-MM-dd HH:mm:ss";
                } else {
                    formatter = "yyyy-MM-dd HH:mm";
                }

                DateTimeFormatter dateTimeFormatter = dateFormatterCache.get(formatter);
                LocalDateTime dateTime = LocalDateTime.parse(datetimeStr, dateTimeFormatter);
                return Date.from(dateTime.atZone(ZoneId.systemDefault()).toInstant());
            } else if (datetimeStr.contains("-")) {
                // 只包含日期
                String formatter = "yyyy-MM-dd";
                DateTimeFormatter dateTimeFormatter = dateFormatterCache.get(formatter);
                LocalDate localDate = LocalDate.parse(datetimeStr, dateTimeFormatter);
                return Date.from(localDate.atStartOfDay().atZone(ZoneId.systemDefault()).toInstant());
            } else if (datetimeStr.contains(":")) {
                // 只包含时间
                String formatter;
                if (datetimeStr.contains(".")) {
                    // 包含毫秒
                    int msLen = len - datetimeStr.indexOf(".") - 1;
                    StringBuilder ms = new StringBuilder();
                    for (int i = 0; i < msLen; i++) {
                        ms.append("S");
                    }
                    formatter = "HH:mm:ss." + ms;
                } else {
                    // 判断包含时间位数
                    int i = datetimeStr.indexOf(":");
                    i = datetimeStr.indexOf(":", i + 1);
                    if (i > -1) {
                        formatter = "HH:mm:ss";
                    } else {
                        formatter = "HH:mm";
                    }
                }
                DateTimeFormatter dateTimeFormatter = dateFormatterCache.get(formatter);
                LocalTime localTime = LocalTime.parse(datetimeStr, dateTimeFormatter);
                LocalDate localDate = LocalDate.of(1970, Month.JANUARY, 1);
                LocalDateTime localDateTime = LocalDateTime.of(localDate, localTime);
                return Date.from(localDateTime.atZone(ZoneId.systemDefault()).toInstant());
            }
        } catch (Throwable e) {
            logger.error(e.getMessage(), e);
        }

        return null;
    }

    public static Result testConn(String hostname, int port, String destination, String username, String password) {
        Result result = new Result();
        result.setSuccess(true);
        CanalConnector canalConnector = CanalConnectors.newSingleConnector(new InetSocketAddress(hostname, port), destination, username,
                password);
        return testConn(canalConnector);
    }

    public static Result testConn(AbstractCanalClient client) {
        if (client == null) {
            return ResultUtil.error("客户端为空");
        }
        if (client.getConnector() == null) {
            return ResultUtil.error("连接为空");
        }
        return testConn(client.getConnector());
    }

    public static Result testConn(CanalConnector canalConnector) {
        Result result = new Result();
        result.setSuccess(true);
        SimpleCanalConnector simpleConnector = (SimpleCanalConnector) canalConnector;
        simpleConnector.setSoTimeout(5 * 1000); // 超时设为5秒,Caused by: java.net.SocketTimeoutException

        try {
            canalConnector.connect();
            canalConnector.subscribe(); // 订阅
            canalConnector.disconnect();
        } catch (CanalClientException e1) {
            // 端口不存在,java.net.ConnectException: Connection refused: connect
            // 如果不是socket服务则会超时,Caused by: java.net.SocketTimeoutException
            // failed to subscribe with reason: something goes wrong with channel:[id: 0x3bb7c55d, /192.168.1.211:6572 =>
            // /192.168.1.248:11111], exception=com.alibaba.otter.canal.server.exception.CanalServerException: destination:mysql2451 should
            // start first
            if (e1.getMessage().contains("should start first")) {
                result.setMessage("实例未启动！");
            } else if (e1.getMessage().contains("auth failed for user")) {
                result.setMessage("用户名密码错误！");
            } else if (e1.getMessage().contains("java.net.SocketTimeoutException")) {
                result.setMessage("连接不能访问！");
            } else {
                result.setMessage("客户端连接错误，请检查ip或端口是否错误！");
            }
            log.warn("客户端连接错误！", e1);

            result.setSuccess(false);
        }
        return result;
    }
}
